add WasbDagBundle to load Dags from Azure Blob Storage#67016
Conversation
|
I added a note in the PR about enhancing the docs. I think a couple of things that I'd like to know if I were a user setting this up from scratch.
|
|
These changes offer a nice starting point: https://github.com/apache/airflow/pull/66993/changes |
Thanks for the review. I'll add provider-level bundle docs for WASB following the pattern in #66993 (providers/microsoft/azure/docs/bundles/index.rst, Guides entry, cross-link to core Dag bundles), and cover auth, managed-identity permissions, container/prefix setup, networking, and reusing the same wasb Connection as in Dags. I'll push an update shortly. |
yuseok89
left a comment
There was a problem hiding this comment.
Left a couple of small comments inline.
Please take a look when you have a moment.
|
@Nishieee — There are 4 unresolved review thread(s) on this PR from @dominikhei, @jroachgolf84, @yuseok89. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? Once you believe the feedback is addressed, mark the thread as resolved so the reviewer isn't re-pinged needlessly. Thanks! Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
0144712 to
a16419b
Compare
|
@Nishieee — There are 2 unresolved review thread(s) on this PR, and you have engaged with each one (post-review commits and/or in-thread replies). Could you confirm whether you believe the feedback is fully addressed and the PR is ready for maintainer review confirmation? If yes, reply here (a short "yes / ready" is fine) and an Apache Airflow maintainer will pick the PR up from the review queue on the next sweep. If you are still working on a thread, please reply with what is outstanding so the threads stay unresolved on purpose. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. Drafted-by: Claude Code (Opus 4.7); reviewed by @potiuk before posting |
yes |
jroachgolf84
left a comment
There was a problem hiding this comment.
Nice work, all the changes LGTM. I'm not a maintainer, so my review doesn't count towards your required approvals. But nice job!
|
@eladkal - can you take a look at this one? I know that you were involved on the initial issue. |
…e/bundles/wasb.py Co-authored-by: Yuseok Jo <yuseok89@gmail.com>
2d3b4e3 to
51d73e9
Compare
|
Probably needs 2nd eye from someone more familiar witht he bundle area |
| self.prefix = prefix | ||
| self.wasb_dags_dir: Path = self.base_dir | ||
|
|
||
| log = structlog.get_logger(__name__) |
There was a problem hiding this comment.
I would try extending WasbDagBundle with LogginMixin instead of instantiating log yourself. If in the future we would for example say we use some other kind of logger, the WasbDagBundle would then be also updated without any intervention needed.
There was a problem hiding this comment.
Hmm I see others do it like that as well, maybe something to refactor in another PR? Maybe BaseDagBundle should extend the LogginMixin.
| super().initialize() | ||
|
|
||
| @property | ||
| def wasb_hook(self): |
There was a problem hiding this comment.
This could simply become a cached_property I think, no need to keep self._wasb_hook.
| account_url = self.wasb_hook.blob_service_client.url | ||
| url = f"{account_url.rstrip('/')}/{self.container_name}" | ||
| if self.prefix: | ||
| url += f"/{self.prefix}" |
There was a problem hiding this comment.
return f"{url}/{self.prefix}"
| self.check_for_variable_type("container", container, ContainerClient) | ||
| container = cast("ContainerClient", container) | ||
|
|
||
| for blob in container.list_blobs(name_starts_with=prefix or None): |
There was a problem hiding this comment.
Couldn't this be avoided if prefix would be None by default instead of empty string?
Summary
Adds a Dag bundle for Azure Blob Storage so Dags can be loaded from a container (with optional prefix), similar to
S3DagBundleandGCSDagBundle. IntroducesWasbDagBundle, extendsWasbHookwith container checks andsync_to_local_dir, registers the bundle in provider metadata, documents it indag-bundles.rst, and adds unit tests.Manual verification: Tested with Breeze against a real Azure storage account:
wasbconnection,dag_processor.dag_bundle_config_listpointing atWasbDagBundle, Dag parsed from blob and visible in the UI. PR includes screenshots (Azure container + Airflow Dags list).closes: #66987
Was generative AI tooling used to co-author this PR?
Generated-by: Cursor following the guidelines